Coverage Report

Created: 2026-03-18 12:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
D:\a\scloud-dns\scloud-dns\src\workers\manager\channels_generation.rs
Line
Count
Source
1
    use crate::exceptions::SCloudException;
2
    use crate::workers::{SCloudWorker, WorkerType};
3
    use std::collections::HashMap;
4
    use std::sync::Arc;
5
    use tokio::sync::mpsc;
6
7
0
    pub(crate) async fn generate_channels(
8
0
        workers: Vec<Arc<SCloudWorker>>,
9
0
    ) -> Result<(), SCloudException> {
10
0
        let mut wl: HashMap<&str, Vec<Arc<SCloudWorker>>> = HashMap::new();
11
0
        for w in workers {
12
0
            let key = match &w.get_worker_type() {
13
0
                WorkerType::LISTENER => "listener",
14
0
                WorkerType::DECODER => "decoder",
15
0
                WorkerType::QUERY_DISPATCHER => "query-dispatcher",
16
0
                WorkerType::CACHE_LOOKUP => "cache-lookup",
17
0
                WorkerType::ZONE_MANAGER => "zone-manager",
18
0
                WorkerType::RESOLVER => "resolver",
19
0
                WorkerType::CACHE_WRITER => "cache-writer",
20
0
                WorkerType::ENCODER => "encoder",
21
0
                WorkerType::SENDER => "sender",
22
0
                WorkerType::CACHE_JANITOR => "cache-janitor",
23
0
                WorkerType::METRICS => "metrics",
24
0
                WorkerType::TCP_ACCEPTOR => "tcp-acceptor",
25
0
                WorkerType::NONE => "none",
26
            };
27
0
            wl.entry(key).or_insert_with(Vec::new).push(Arc::clone(&w));
28
        }
29
30
0
        let default_worker = vec![Arc::new(SCloudWorker::new(WorkerType::NONE)?)];
31
0
        let tcp_acceptor = wl.get("tcp-acceptor").unwrap_or(&default_worker);
32
0
        let decoder = wl.get("decoder").unwrap_or(&default_worker);
33
0
        let query_dispatcher = wl.get("query-dispatcher").unwrap_or(&default_worker);
34
0
        let cache_lookup = wl.get("cache-lookup").unwrap_or(&default_worker);
35
0
        let cache_writers = wl.get("cache-writer").unwrap_or(&default_worker);
36
0
        let zone_manager = wl.get("zone-manager").unwrap_or(&default_worker);
37
0
        let resolvers = wl.get("resolver").unwrap_or(&default_worker);
38
0
        let encoders = wl.get("encoder").unwrap_or(&default_worker);
39
0
        let senders = wl.get("sender").unwrap_or(&default_worker);
40
41
        // Helper: wire N producers -> M consumers
42
        // Each producer gets M senders, each consumer gets N receivers
43
0
        async fn wire(
44
0
            producers: &[Arc<SCloudWorker>],
45
0
            consumers: &[Arc<SCloudWorker>],
46
0
            capacity: usize,
47
0
        ) {
48
0
            for p in producers {
49
0
                let mut txs = Vec::new();
50
0
                for c in consumers {
51
0
                    let (tx, rx) = mpsc::channel(capacity);
52
0
                    c.push_dns_rx(rx).await;
53
0
                    txs.push(tx);
54
                }
55
0
                p.push_dns_tx_many(txs).await;
56
            }
57
0
        }
58
59
0
        wire(tcp_acceptor, decoder, 1024).await;
60
0
        wire(decoder, cache_lookup, 1024).await;
61
0
        wire(cache_lookup, cache_writers, 1024).await;    // tx[0] = miss path
62
0
        wire(cache_lookup, query_dispatcher, 1024).await; // tx[1] = hit path
63
0
        wire(query_dispatcher, zone_manager, 1024).await;
64
0
        wire(query_dispatcher, resolvers, 1024).await;
65
0
        wire(zone_manager, cache_writers, 1024).await;
66
0
        wire(resolvers, cache_writers, 1024).await;
67
0
        wire(cache_writers, encoders, 1024).await;
68
0
        wire(encoders, senders, 1024).await;
69
70
0
        Ok(())
71
0
    }